[t:/]$ 지식_

하둡 스트리밍으로 MR 짜기 hadoop streaming

2016/09/28

읽기 전에

이 몸은 자바, 빅데이터 생테계에 대해서 아는 것이 1도 없는 쪼렙입니다. 그냥 참고만 하시고, 불합리, 비효율, 등신같음에 대해서는 알아서 필터링 하시고 더 좋은 방법을 사용하세요. 개인 노트이므로 내용은 중구난방.. 의식의 흐름...

이게 뭐냐?

https://www.google.co.kr/search?q=hadoop+streaming&oq=hadoop+streaming&aqs=chrome..69i57j69i60j0j69i59j0l2.2951j0j1&sourceid=chrome&ie=UTF-8

stdin, stdout을 이용하여 MR을 짤 수가 있다. 쉽다. 자바를 몰라도 된다. 간단한 MR은 다 할 수 있다. 파이썬, C, 자바 뭐든 된다. C를 쓰고 싶은데 파이썬이 너무 편해서 나는 파이썬, 그리고 mawk를 사용한다. 원래 쉬운데, 인터넷에 검색도 잘 된다.

awk, mawk

맵퍼는 awk, cat도 쓸 수 있다. 예를 들어 다음 코드는 한 줄의 1번째 컬럼을 키로, 3번째 컬럼을 값으로 맵한다.

awk -F'\t' '{ print $1,$3 }'

awk 사용법을 숙지하면 어지간한 매핑 작업은 거의 다 할 수 있다. awk는 리눅스에서 기본적으로 gawk를 사용한다. 그런데 이게 좀 느리다. mawk를 검색하여 빌드하여 사용하면 이쪽이 빠르다. 왠만하면 mawk를 사용하도록 한다.

문자열 delimiter를 하이브에서 주로 쓰는 캐릭터인 아스키코드 1번 문자로 하고 싶다면 다음과 같이 한다.

awk -F'\001' '{ print $1,$3 }'

복잡한 매핑작업을 하고 싶다면 awk 파일을 생성하고 다음과 같이 한다. 예제는 인터넷에 많다.

awk -f mywork.awk

매핑할 일이 없다면 바로 cat을 쓰면 되고, uniq, wc 명령어도 유용하다.

files 옵션

mawk와 mawk가 사용할 awk 스크립트 파일, 파이썬 파일, 기타 파일들은 옵션 -files로 지정해야 한다. 공백없이 컴마로 구분하여 지정한다. 인덱스나 기타 간단한 참조 데이터가 필요하다면 일일이 MR을 돌리지 말고 바로 files로 지정하고, 파이썬 등에서 읽어 사용하는 것이 편하다. files로 지정한 파일들은 모두 데이터 노드에 복사/배포된다.

join

join을 처리하고 싶다면 두 파일의 파일명을 알아야 한다. 또는 특정 파일에만 존재하는 특성값을 이용할 수 있다. 두 파일을 이용한 join을 처리하고 싶다면 파이썬을 추천한다. 하둡 스크리밍이 할당한 파일명을 알고 싶다면 다음 변수를 사용할 수 있다.

src = os.environ['mapreduce_map_input_file']

이 역시 검색하면 많이 나온다. 요약은 이렇다. A라는 파일의 1번컬럼과 B라는 파일의 2번 컬럼을 join 하고 싶다면 위의 환경변수 src를 이용하여 A인 경우 1번 컬럼을 키로, B인 경우 2번 컬럼을 키로 매핑하면 된다. 이렇게 매핑한 내용을 리듀서 단계에서 합쳐주면 join이 된다.

특정 파일에만 존재하는 특성값을 이용하고 싶다면 이렇게 할 수 있다. A라는 파일의 필드 갯수가 10개이고 B라는 파일의 필드 갯수가 9개라면 필드 갯수를 이용하여 파일을 식별하고 앞에서 쓴 방법으로 join을 시행하면 된다. awk 만으로 join 할 수 있는 방법을 찾아보았으나 애석하게도 특성값을 이용한 방법만 가능한 것 같다.

로컬 테스트

하둡 스트리밍은 로컬에서 선빵 테스트하는 것이 쉽다. 하둡 스트리밍에 올리기 전에 아래와 같은 테스트도 가능하다. 여기서는 sort가 리듀서 역할이다.

cat data | awk -F'\t' '{ print $1, $3}' | sort -k1 -n -r

기본 리듀서

리듀서를 지정하지 않으면 소팅만 된다. 리듀서를 0으로 설정한다면 소팅 조차 되지 않을 것이다.

reduce 100% ?

리듀싱 작업이 막바지에 끝나지 않는다면 여러 개의 리듀서 중 일부가 뺑뺑이 치고 있는 것이다. 이는 주로 키 분포가 편중되서 일어난다. 특정 키에 값이 몰렸고, 특정 리듀서가 홀로 대용량을 처리하고 있기 때문이다. 보통은 널값, 공백등 매직넘버성 키를 거르지 못했을 때 일어난다.

python에서 reduce를 쉽게 하려면

set 자료구조를 사용하면 자연스럽게 중복 제거도 되고 리스트 전환이나 속도 상에서도 이롭다. 자료구조 전략은 컴퓨터 아키텍쳐를 잘 알 수록 유리하다. 예를 들어 해시맵은 사실 O(1)이 아니다. 파이썬 기본 자료 구조를 사용하면 캐시 힛팅률이 좋지 않다. 벡터(배열) 근본주의자는 흔히 쓰는 딕셔너리가 단일 머신에서 대용량을 처리하기에는 좋지 않을 것이라고 생각한다. 사실 MR을 처리하면서 캐시 힛팅률을 꼭 높일 필요가 없을 것이다. 바틀넥을 잡아야지 엉뚱한 곳을 쥐어짤 필요는 없다. 통상적으로 판단한다면, 리듀서에서 다룰 객체 한 개가 저장할 데이터가 많지 않은 경우 자료형은 그다지 고민할 대상이 아니다.

한 방에 많이 할래, 나눠서 할래

이것은 데이터 분석 전략이다. MR 횟수를 줄이려면 한 번에 많은 작업을 해야 한다. mapreduce_map_input_file 환경 변수를 이용하여 다양한 hdfs 경로 상의 소스를 각각 구분하여 처리할 수 있다. 그런데 이게 꼭 좋은 것은 아니다. 데이터가 크다면 한 번에 여러 절차의 처리를 하지 말고, 일단 요약 MR을 시행한 후, 축소한 데이터를 대상으로 본 작업에 들어가는 것이 더 빠른 경향을 보인다. MR의 횟수는 늘어나지만 전체 속도는 개선될 수 있다. 이따금 job이 실패할 때에도 절차적으로 나눠서 일을 진행했다면 실패하기 전까지의 단계를 건질 수도 있다.

참조 파일이 필요하고 그 파일이 작은 크기라면 그냥 input 옵션으로 참조 파일을 복사하고 open 하여 읽어 쓰는 것이 빠르다. 작은 파일을 다루기 위해 일일이 MR 자원을 사용하는 것은 낭비다. 심지어 하둡 스트리밍을 사용하지 않고 HDFS에서 꺼내서 처리하고 다시 넣는 것이 빠를 수도 있다. 어느 전략을 취할 지는 경험적, 짬밥이 도움이 된다. 은퇴때까지 코딩하고 싶어요!

하이브 연동, hive

하이브에서 다루는 데이터의 HDFS상 실제 경로는 hive 콘솔에서 다음 명령어로 확인할 수 있다.

desc formatted [table 이름]

이때 delimiter도 확인하여 매퍼에서 사용한다. 파티션은 각각의 경로로 되어 있으며 HDFS의 구조를 탐험하고 온 다음 정확히 취득할 수 있다.

하이브에서 테이블을 정의할 때, 익스터널 테이블을 사용하면 하둡 스트리밍의 결과를 바로 하이브에서 조회할 수 있다. 단, 파티션을 사용하는 경우 파티션이 취하는 경로 모양을 그대로 모사하여 HDFS상에 저장해야 하며, 다음 명령어로 파티션이 추가됐음을 하이브에게 알려야한다.

echo "msck repair table dmp.dmp_seg_mapped;" | hv

위에서 hv는 각자가 사용하는 하이브 쉘 (예:beeline)의 alias 이름이다. msck 명령어보다는 alter 명령어를 사용하는 것이 정석이나, 이 때에는 value도 구제적으로 적어줘야 하는 단점이 있다.

데이터를 임팔라에서도 열람해야 한다면, 임팔라가 갱신된 데이터를 접근할 수 있도록 다음 명령어를 시행한다.

INVALIDATE METADATA dmp.dmp_seg_mapped

마찬가지로 이 명령어도 각자가 사용하는 임팔라 쉘에 파이프나 기타 방법으로 주입할 수 있으며, bash 스크립트에 의해 절차적으로 처리 가능하다.

input

하둡 스트리밍에서 input 파일은 매우 큰 길이의 문자열이 될 가능성이 있다. 예를 들어 100일간의 데이터를 처리하면서, 이 데이터가 1일단위로 파티셔닝 되어 있다면 각각의 경로를 보유한다. 따라서 input 파일의 경로 문자열을 생성하는 파이썬 스크립트 등을 작성하는 것이 편하다. 에를 들면 이런 파이썬 스크립트를 작성하는 것이다.

input_path_generator.py [기본경로] [기간] > inputs.data

하둡 스트리밍 input에 넣을 때에는 다음과 같이 처리하면 된다. bash를 다루는 사람은 알겠지만 역홑따옴표를 사용했음을 주의한다.

-input `cat inputs.data`

전체 모양새

전체 모양새는 이런 구성을 취할 수 있다.

hadoop-streaming-2.6.0-cdh5.5.1.jar \
  -D dfs.replication=2 -D mapreduce.job.queuename=COMMON \
  -D mapreduce.job.reduces=128 \
  -files mawk,last.awk,r_last_mid.py \
  -input `cat inputs.data` \
  -output my_result \
  -mapper 'mawk -f last.awk' \
  -reducer 'r_last_mid.py'

예외 처리

bash 쉘 스크립트로 일련의 작업을 절차적으로 처리할 때, 중간에 발생하는 에러에 대해 예외 처리를 하고 싶을 수 있다. 이 때는 exit 코드를 활용할 수 있다. 다음 코드는 앞에서 실행한 작업의 exit 코드를 출력하며 정상=0이 아닌 경우를 예외 처리 트리거로 사용할 수 있다.

echo $?

실제적으로는 다음과 같이 처리할 수 있다. 링크된 글의 run, die 함수가 적절하다. http://stackoverflow.com/questions/6109225/bash-echoing-the-last-command-run

물론 더 깊숙한 예외 처리를 하고 싶다면 리눅스에서 흔히 사용하는 pid lock 파일 (로컬) 그리고 HDFS 경로상에 익스터널 테이블을 연동하여 status 관리를 하는 것이다. 히스토리 관리까지 할 수도 있다. atomic 문제가 염려된다면 조금 더 신경을 써야 한다. 하둡 인터페이스 서버가 외부와 통신할 수 있다면 curl 등을 이용하여 적극적으로 외부에 트리거를 때릴 수 있다. 나는 자바, 하둡, 빅데이터 생태계를 잘 아는 사람이 아니므로 더 풍부한 이벤트, 트리거, 메시징 툴킷에 대해서는 모르므로 각자의 최선을 다해 처리하되, 직접 만드는 것이 편한 사람은 앞서 설명한 방법을 쓸 수 있다.

주섬주섬 노트했는데.. 참 별거 없구나.

끗.





공유하기













[t:/] is not "technology - root". dawnsea, rss